// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.

use crate::io::TcpStreamResource;
use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync;
use crate::NetPermissions;
use deno_core::error::bad_resource;
use deno_core::error::custom_error;
use deno_core::error::generic_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::CancelFuture;

use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use serde::Deserialize;
use serde::Serialize;
use socket2::Domain;
use socket2::Protocol;
use socket2::Socket;
use socket2::Type;
use std::borrow::Cow;
use std::cell::RefCell;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::rc::Rc;
use std::str::FromStr;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
use trust_dns_proto::rr::rdata::caa::Value;
use trust_dns_proto::rr::record_data::RData;
use trust_dns_proto::rr::record_type::RecordType;
use trust_dns_resolver::config::NameServerConfigGroup;
use trust_dns_resolver::config::ResolverConfig;
use trust_dns_resolver::config::ResolverOpts;
use trust_dns_resolver::error::ResolveErrorKind;
use trust_dns_resolver::system_conf;
use trust_dns_resolver::AsyncResolver;

#[derive(Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct TlsHandshakeInfo {
  pub alpn_protocol: Option<ByteString>,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct IpAddr {
  pub hostname: String,
  pub port: u16,
}

impl From<SocketAddr> for IpAddr {
  fn from(addr: SocketAddr) -> Self {
    Self {
      hostname: addr.ip().to_string(),
      port: addr.port(),
    }
  }
}
pub(crate) fn accept_err(e: std::io::Error) -> AnyError {
  // FIXME(bartlomieju): compatibility with current JS implementation
  if let std::io::ErrorKind::Interrupted = e.kind() {
    bad_resource("Listener has been closed")
  } else {
    e.into()
  }
}

/// recv 一个TcpStream 创建TcpStreamResource
/// 这里本来是要从 resource_table获取 TcpListener 在这里被系统接管了
#[op]
async fn op_net_accept_tcp(state: Rc<RefCell<OpState>>, _rid: ResourceId) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> {
  let acceptors_rx = {
    //这里不能直接take 否则下次来的时候没办法再次获取 已经被回收
    let mut op_state = state.borrow_mut();
    op_state.borrow_mut::<async_channel::Receiver<TcpStream>>().clone()
  };
  let tcp_stream = acceptors_rx.recv().await?;
  let local_addr = tcp_stream.local_addr()?;
  let remote_addr = tcp_stream.peer_addr()?;
  let mut state = state.borrow_mut();
  let rid = state.resource_table.add(TcpStreamResource::new(tcp_stream.into_split()));
  Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr)))
}

#[op]
async fn op_net_recv_udp(state: Rc<RefCell<OpState>>, rid: ResourceId, mut buf: ZeroCopyBuf) -> Result<(usize, IpAddr), AnyError> {
  let resource = state.borrow_mut().resource_table.get::<UdpSocketResource>(rid).map_err(|_| bad_resource("Socket has been closed"))?;
  let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
  let cancel_handle = RcRef::map(&resource, |r| &r.cancel);
  let (nread, remote_addr) = socket.recv_from(&mut buf).try_or_cancel(cancel_handle).await?;
  Ok((nread, IpAddr::from(remote_addr)))
}

#[op]
async fn op_net_send_udp<NP>(state: Rc<RefCell<OpState>>, rid: ResourceId, addr: IpAddr, zero_copy: ZeroCopyBuf) -> Result<usize, AnyError>
where
  NP: NetPermissions + 'static,
{
  {
    let mut s = state.borrow_mut();
    s.borrow_mut::<NP>().check_net(&(&addr.hostname, Some(addr.port)), "Deno.DatagramConn.send()")?;
  }
  let addr = resolve_addr(&addr.hostname, addr.port).await?.next().ok_or_else(|| generic_error("No resolved address found"))?;

  let resource = state.borrow_mut().resource_table.get::<UdpSocketResource>(rid).map_err(|_| bad_resource("Socket has been closed"))?;
  let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
  let nwritten = socket.send_to(&zero_copy, &addr).await?;

  Ok(nwritten)
}

#[op]
async fn op_net_join_multi_v4_udp<NP>(state: Rc<RefCell<OpState>>, rid: ResourceId, address: String, multi_interface: String) -> Result<(), AnyError>
where
  NP: NetPermissions + 'static,
{
  let resource = state.borrow_mut().resource_table.get::<UdpSocketResource>(rid).map_err(|_| bad_resource("Socket has been closed"))?;
  let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

  let addr = Ipv4Addr::from_str(address.as_str())?;
  let interface_addr = Ipv4Addr::from_str(multi_interface.as_str())?;

  socket.join_multicast_v4(addr, interface_addr)?;

  Ok(())
}

#[op]
async fn op_net_join_multi_v6_udp<NP>(state: Rc<RefCell<OpState>>, rid: ResourceId, address: String, multi_interface: u32) -> Result<(), AnyError>
where
  NP: NetPermissions + 'static,
{
  let resource = state.borrow_mut().resource_table.get::<UdpSocketResource>(rid).map_err(|_| bad_resource("Socket has been closed"))?;
  let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

  let addr = Ipv6Addr::from_str(address.as_str())?;

  socket.join_multicast_v6(&addr, multi_interface)?;

  Ok(())
}

#[op]
async fn op_net_leave_multi_v4_udp<NP>(state: Rc<RefCell<OpState>>, rid: ResourceId, address: String, multi_interface: String) -> Result<(), AnyError>
where
  NP: NetPermissions + 'static,
{
  let resource = state.borrow_mut().resource_table.get::<UdpSocketResource>(rid).map_err(|_| bad_resource("Socket has been closed"))?;
  let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

  let addr = Ipv4Addr::from_str(address.as_str())?;
  let interface_addr = Ipv4Addr::from_str(multi_interface.as_str())?;

  socket.leave_multicast_v4(addr, interface_addr)?;

  Ok(())
}

#[op]
async fn op_net_leave_multi_v6_udp<NP>(state: Rc<RefCell<OpState>>, rid: ResourceId, address: String, multi_interface: u32) -> Result<(), AnyError>
where
  NP: NetPermissions + 'static,
{
  let resource = state.borrow_mut().resource_table.get::<UdpSocketResource>(rid).map_err(|_| bad_resource("Socket has been closed"))?;
  let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

  let addr = Ipv6Addr::from_str(address.as_str())?;

  socket.leave_multicast_v6(&addr, multi_interface)?;

  Ok(())
}

#[op]
async fn op_net_set_multi_loopback_udp<NP>(state: Rc<RefCell<OpState>>, rid: ResourceId, is_v4_membership: bool, loopback: bool) -> Result<(), AnyError>
where
  NP: NetPermissions + 'static,
{
  let resource = state.borrow_mut().resource_table.get::<UdpSocketResource>(rid).map_err(|_| bad_resource("Socket has been closed"))?;
  let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

  if is_v4_membership {
    socket.set_multicast_loop_v4(loopback)?
  } else {
    socket.set_multicast_loop_v6(loopback)?;
  }

  Ok(())
}

#[op]
async fn op_net_set_multi_ttl_udp<NP>(state: Rc<RefCell<OpState>>, rid: ResourceId, ttl: u32) -> Result<(), AnyError>
where
  NP: NetPermissions + 'static,
{
  let resource = state.borrow_mut().resource_table.get::<UdpSocketResource>(rid).map_err(|_| bad_resource("Socket has been closed"))?;
  let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;

  socket.set_multicast_ttl_v4(ttl)?;

  Ok(())
}

#[op]
pub async fn op_net_connect_tcp<NP>(state: Rc<RefCell<OpState>>, addr: IpAddr) -> Result<(ResourceId, IpAddr, IpAddr), AnyError>
where
  NP: NetPermissions + 'static,
{
  {
    let mut state_ = state.borrow_mut();
    state_.borrow_mut::<NP>().check_net(&(&addr.hostname, Some(addr.port)), "Deno.connect()")?;
  }

  let addr = resolve_addr(&addr.hostname, addr.port).await?.next().ok_or_else(|| generic_error("No resolved address found"))?;
  let tcp_stream = TcpStream::connect(&addr).await?;
  let local_addr = tcp_stream.local_addr()?;
  let remote_addr = tcp_stream.peer_addr()?;

  let mut state_ = state.borrow_mut();
  let rid = state_.resource_table.add(TcpStreamResource::new(tcp_stream.into_split()));

  Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr)))
}

pub struct TcpListenerResource {
  pub listener: AsyncRefCell<TcpListener>,
  pub cancel: CancelHandle,
}

impl Resource for TcpListenerResource {
  fn name(&self) -> Cow<str> {
    "tcpListener".into()
  }

  fn close(self: Rc<Self>) {
    self.cancel.cancel();
  }
}

struct UdpSocketResource {
  socket: AsyncRefCell<UdpSocket>,
  cancel: CancelHandle,
}

impl Resource for UdpSocketResource {
  fn name(&self) -> Cow<str> {
    "udpSocket".into()
  }

  fn close(self: Rc<Self>) {
    self.cancel.cancel()
  }
}
///此方法不做任何处理 接管原有的 op_net_listen_tcp方法
///直接返回ResourceId 和IpAddr
/// 如果想查看原方法实现 请查看deno 1.41源代码 op_net_listen_tcp方法的实现
#[op]
fn op_net_listen_tcp<NP>(_state: &mut OpState, addr: IpAddr, _reuse_port: bool) -> Result<(ResourceId, IpAddr), AnyError>
where
  NP: NetPermissions + 'static,
{
  Ok((1, addr))
}

fn net_listen_udp<NP>(state: &mut OpState, addr: IpAddr, reuse_address: bool, loopback: bool) -> Result<(ResourceId, IpAddr), AnyError>
where
  NP: NetPermissions + 'static,
{
  state.borrow_mut::<NP>().check_net(&(&addr.hostname, Some(addr.port)), "Deno.listenDatagram()")?;
  let addr = resolve_addr_sync(&addr.hostname, addr.port)?.next().ok_or_else(|| generic_error("No resolved address found"))?;

  let domain = if addr.is_ipv4() { Domain::IPV4 } else { Domain::IPV6 };
  let socket_tmp = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
  if reuse_address {
    // This logic is taken from libuv:
    //
    // On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
    // refinements for programs that use multicast.
    //
    // Linux as of 3.9 has a SO_REUSEPORT socket option but with semantics that
    // are different from the BSDs: it _shares_ the port rather than steal it
    // from the current listener. While useful, it's not something we can
    // emulate on other platforms so we don't enable it.
    #[cfg(any(target_os = "windows", target_os = "linux"))]
    socket_tmp.set_reuse_address(true)?;
    #[cfg(all(unix, not(target_os = "linux")))]
    socket_tmp.set_reuse_port(true)?;
  }
  let socket_addr = socket2::SockAddr::from(addr);
  socket_tmp.bind(&socket_addr)?;
  socket_tmp.set_nonblocking(true)?;

  // Enable messages to be sent to the broadcast address (255.255.255.255) by default
  socket_tmp.set_broadcast(true)?;

  if domain == Domain::IPV4 {
    socket_tmp.set_multicast_loop_v4(loopback)?;
  } else {
    socket_tmp.set_multicast_loop_v6(loopback)?;
  }

  let std_socket: std::net::UdpSocket = socket_tmp.into();

  let socket = UdpSocket::from_std(std_socket)?;
  let local_addr = socket.local_addr()?;
  let socket_resource = UdpSocketResource {
    socket: AsyncRefCell::new(socket),
    cancel: Default::default(),
  };
  let rid = state.resource_table.add(socket_resource);

  Ok((rid, IpAddr::from(local_addr)))
}

#[op]
fn op_net_listen_udp<NP>(state: &mut OpState, addr: IpAddr, reuse_address: bool, loopback: bool) -> Result<(ResourceId, IpAddr), AnyError>
where
  NP: NetPermissions + 'static,
{
  super::check_unstable(state, "Deno.listenDatagram");
  net_listen_udp::<NP>(state, addr, reuse_address, loopback)
}

#[op]
fn op_node_unstable_net_listen_udp<NP>(state: &mut OpState, addr: IpAddr, reuse_address: bool, loopback: bool) -> Result<(ResourceId, IpAddr), AnyError>
where
  NP: NetPermissions + 'static,
{
  net_listen_udp::<NP>(state, addr, reuse_address, loopback)
}

#[derive(Serialize, Eq, PartialEq, Debug)]
#[serde(untagged)]
pub enum DnsReturnRecord {
  A(String),
  Aaaa(String),
  Aname(String),
  Caa {
    critical: bool,
    tag: String,
    value: String,
  },
  Cname(String),
  Mx {
    preference: u16,
    exchange: String,
  },
  Naptr {
    order: u16,
    preference: u16,
    flags: String,
    services: String,
    regexp: String,
    replacement: String,
  },
  Ns(String),
  Ptr(String),
  Soa {
    mname: String,
    rname: String,
    serial: u32,
    refresh: i32,
    retry: i32,
    expire: i32,
    minimum: u32,
  },
  Srv {
    priority: u16,
    weight: u16,
    port: u16,
    target: String,
  },
  Txt(Vec<String>),
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ResolveAddrArgs {
  cancel_rid: Option<ResourceId>,
  query: String,
  record_type: RecordType,
  options: Option<ResolveDnsOption>,
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ResolveDnsOption {
  name_server: Option<NameServer>,
}

fn default_port() -> u16 {
  53
}

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NameServer {
  ip_addr: String,
  #[serde(default = "default_port")]
  port: u16,
}

#[op]
pub async fn op_dns_resolve<NP>(state: Rc<RefCell<OpState>>, args: ResolveAddrArgs) -> Result<Vec<DnsReturnRecord>, AnyError>
where
  NP: NetPermissions + 'static,
{
  let ResolveAddrArgs {
    query,
    record_type,
    options,
    cancel_rid,
  } = args;

  let (config, opts) = if let Some(name_server) = options.as_ref().and_then(|o| o.name_server.as_ref()) {
    let group = NameServerConfigGroup::from_ips_clear(&[name_server.ip_addr.parse()?], name_server.port, true);
    (ResolverConfig::from_parts(None, vec![], group), ResolverOpts::default())
  } else {
    system_conf::read_system_conf()?
  };

  {
    let mut s = state.borrow_mut();
    let perm = s.borrow_mut::<NP>();

    // Checks permission against the name servers which will be actually queried.
    for ns in config.name_servers() {
      let socker_addr = &ns.socket_addr;
      let ip = socker_addr.ip().to_string();
      let port = socker_addr.port();
      perm.check_net(&(ip, Some(port)), "Deno.resolveDns()")?;
    }
  }

  let resolver = AsyncResolver::tokio(config, opts)?;

  let lookup_fut = resolver.lookup(query, record_type);

  let cancel_handle = cancel_rid.and_then(|rid| state.borrow_mut().resource_table.get::<CancelHandle>(rid).ok());

  let lookup = if let Some(cancel_handle) = cancel_handle {
    let lookup_rv = lookup_fut.or_cancel(cancel_handle).await;

    if let Some(cancel_rid) = cancel_rid {
      state.borrow_mut().resource_table.close(cancel_rid).ok();
    };

    lookup_rv?
  } else {
    lookup_fut.await
  };

  lookup
    .map_err(|e| {
      let message = format!("{e}");
      match e.kind() {
        ResolveErrorKind::NoRecordsFound { .. } => custom_error("NotFound", message),
        ResolveErrorKind::Message("No connections available") => custom_error("NotConnected", message),
        ResolveErrorKind::Timeout => custom_error("TimedOut", message),
        _ => generic_error(message),
      }
    })?
    .iter()
    .filter_map(|rdata| rdata_to_return_record(record_type)(rdata).transpose())
    .collect::<Result<Vec<DnsReturnRecord>, AnyError>>()
}

#[op]
pub fn op_set_nodelay(state: &mut OpState, rid: ResourceId, nodelay: bool) -> Result<(), AnyError> {
  let resource: Rc<TcpStreamResource> = state.resource_table.get::<TcpStreamResource>(rid)?;
  resource.set_nodelay(nodelay)
}

#[op]
pub fn op_set_keepalive(state: &mut OpState, rid: ResourceId, keepalive: bool) -> Result<(), AnyError> {
  let resource: Rc<TcpStreamResource> = state.resource_table.get::<TcpStreamResource>(rid)?;
  resource.set_keepalive(keepalive)
}

fn rdata_to_return_record(ty: RecordType) -> impl Fn(&RData) -> Result<Option<DnsReturnRecord>, AnyError> {
  use RecordType::*;
  move |r: &RData| -> Result<Option<DnsReturnRecord>, AnyError> {
    let record = match ty {
      A => r.as_a().map(ToString::to_string).map(DnsReturnRecord::A),
      AAAA => r.as_aaaa().map(ToString::to_string).map(DnsReturnRecord::Aaaa),
      ANAME => r.as_aname().map(ToString::to_string).map(DnsReturnRecord::Aname),
      CAA => r.as_caa().map(|caa| DnsReturnRecord::Caa {
        critical: caa.issuer_critical(),
        tag: caa.tag().to_string(),
        value: match caa.value() {
          Value::Issuer(name, key_values) => {
            let mut s = String::new();

            if let Some(name) = name {
              s.push_str(&name.to_string());
            } else if name.is_none() && key_values.is_empty() {
              s.push(';');
            }

            for key_value in key_values {
              s.push_str("; ");
              s.push_str(&key_value.to_string());
            }

            s
          }
          Value::Url(url) => url.to_string(),
          Value::Unknown(data) => String::from_utf8(data.to_vec()).unwrap(),
        },
      }),
      CNAME => r.as_cname().map(ToString::to_string).map(DnsReturnRecord::Cname),
      MX => r.as_mx().map(|mx| DnsReturnRecord::Mx {
        preference: mx.preference(),
        exchange: mx.exchange().to_string(),
      }),
      NAPTR => r.as_naptr().map(|naptr| DnsReturnRecord::Naptr {
        order: naptr.order(),
        preference: naptr.preference(),
        flags: String::from_utf8(naptr.flags().to_vec()).unwrap(),
        services: String::from_utf8(naptr.services().to_vec()).unwrap(),
        regexp: String::from_utf8(naptr.regexp().to_vec()).unwrap(),
        replacement: naptr.replacement().to_string(),
      }),
      NS => r.as_ns().map(ToString::to_string).map(DnsReturnRecord::Ns),
      PTR => r.as_ptr().map(ToString::to_string).map(DnsReturnRecord::Ptr),
      SOA => r.as_soa().map(|soa| DnsReturnRecord::Soa {
        mname: soa.mname().to_string(),
        rname: soa.rname().to_string(),
        serial: soa.serial(),
        refresh: soa.refresh(),
        retry: soa.retry(),
        expire: soa.expire(),
        minimum: soa.minimum(),
      }),
      SRV => r.as_srv().map(|srv| DnsReturnRecord::Srv {
        priority: srv.priority(),
        weight: srv.weight(),
        port: srv.port(),
        target: srv.target().to_string(),
      }),
      TXT => r.as_txt().map(|txt| {
        let texts: Vec<String> = txt
          .iter()
          .map(|bytes| {
            // Tries to parse these bytes as Latin-1
            bytes.iter().map(|&b| b as char).collect::<String>()
          })
          .collect();
        DnsReturnRecord::Txt(texts)
      }),
      _ => return Err(custom_error("NotSupported", "Provided record type is not supported")),
    };
    Ok(record)
  }
}
